Source code for hysop.operator.redistribute

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Setup for data transfer/redistribution between topologies or operators

`.. currentmodule : hysop.operator.redistribute

* :class:`~Redistribute` generate the optimal set of RedistributeOperatorBase instances
    for one or multiple variables given candidate source topolgies and one output topology.
"""

from abc import ABCMeta, abstractmethod
from hysop.constants import DirectionLabels
from hysop.tools.htypes import check_instance, to_set, to_tuple, first_not_None
from hysop.tools.decorators import debug
from hysop.fields.continuous_field import Field
from hysop.topology.topology import Topology
from hysop.topology.cartesian_topology import CartesianTopology
from hysop.core.mpi.redistribute import (
    RedistributeIntra,
    RedistributeInter,
    RedistributeInterParam,
    RedistributeOperatorBase,
)
from hysop.core.graph.node_generator import ComputationalGraphNodeGenerator


[docs] class RedistributeNotImplementedError(Exception): pass
[docs] class Redistribute(ComputationalGraphNodeGenerator): """Node generator generating redistribute operators.""" __redistribute_operators = { RedistributeIntra: 0, RedistributeInter: 1, # 2: RedistributeOverlap, } """ Implemented redistribute operators, keys are classes that inerit hysop.core.mpi.redistribute.RedistributeOperatorBase values are operator priority (smaller value has more priority), """ for cls in __redistribute_operators.keys(): assert issubclass( cls, RedistributeOperatorBase ), f"{cls} is not a RedistributeOperatorBase." def __new__( cls, variables, source_topos, target_topo, components=None, name=None, pretty_name=None, base_kwds=None, **kwds, ): base_kwds = first_not_None(base_kwds, {}) return super().__new__( cls, name=name, pretty_name=pretty_name, candidate_input_tensors=None, candidate_output_tensors=None, **base_kwds, ) def __init__( self, variables, source_topos, target_topo, components=None, name=None, pretty_name=None, base_kwds=None, **kwds, ): """ Initialize a Redistribute operator generator. Parameters ---------- variables: :class:`~hysop.field.continuous.Field` or array like of continuous fields. the continuous variables to be distributed source_topos: :class:`~hysop.topology.topology.Topology` or array like of topologies, or dict(field, topologies) candidate source mesh topologies (for each field the optimal source topology will be choosed) target_topo: :class:`~hysop.topology.topology.Topology` or dict(Field, Topology) target mesh topology for all variables (or per variable if a dictionnary is passed) name: string prefix for generated operator names pretty_name: string pretty prefix for generated operator names base_kwds: dict, optional, defaults to None Base class keywords arguments. If None, an empty dict will be passed. kwds: Keywords arguments that will be passed towards implementation redistribute operator __init__. """ assert "source_topo" not in kwds base_kwds = first_not_None(base_kwds, {}) variables = to_tuple(variables) super().__init__( name=name, pretty_name=pretty_name, candidate_input_tensors=variables, candidate_output_tensors=variables, **base_kwds, ) # format variables to a set of variables variables = to_set(variables) check_instance(variables, set, values=Field) # format source topos to a dict(Field, set(Topology)) if isinstance(source_topos, dict): for k, v in source_topos: if not isinstance(v, set): source_topos[k] = to_set(v) else: source_topos = to_set(source_topos) source_topos = dict(zip(variables, (source_topos,) * len(variables))) check_instance(source_topos, dict, keys=Field, values=set) for v in source_topos.values(): check_instance(v, set, values=(Topology, type(None))) # format target_topo to a dict(Field, Topology) if not isinstance(target_topo, dict): check_instance(target_topo, Topology, allow_none=True) target_topo = dict(zip(variables, (target_topo,) * len(variables))) check_instance(target_topo, dict, keys=Field, values=(Topology, type(None))) # format components to a dict(Field, set(int)|None) if not isinstance(components, dict): if components is not None: components = to_set(components) components = dict(zip(variables, (components,) * len(variables))) check_instance(components, dict, keys=Field) for v in components.values(): check_instance(v, set, values=int, allow_none=True) self._variables = variables self._source_topos = source_topos self._target_topo = target_topo self._components = components self._kwds = kwds @debug def _generate(self): nodes = [] for var in self._variables: source_topos = self._source_topos[var] target_topo = self._target_topo[var] components = self._components[var] kwds = self._kwds.copy() # if source topology is destination topology there is nothing to be done if target_topo in source_topos: continue # else we find the most suitable source topology node = self._select_redistribute( variable=var, source_topos=source_topos, target_topo=target_topo, components=components, name=self.name, pretty_name=self.pretty_name, **kwds, ) nodes.append(node) return nodes @staticmethod def _select_redistribute(variable, source_topos, target_topo, components, **kwds): assert target_topo not in source_topos best_redis = None for source_topo in source_topos: redis = Redistribute._get_compatible_redistribute( variable, source_topo, target_topo, components, **kwds ) best_redis = Redistribute._select_best_operator(best_redis, redis) if best_redis is None: msg = "Failed to find a suitable redistribute operator for variables {} " msg += "between sources topologies and destination topology.\n" msg = msg.format(variable.name) for i, st in enumerate(source_topos): msg += f"\n::CANDIDATE SOURCE TOPOLOGY {i}::\n" msg += str(st) msg += "\n::DESTINATION TOPOLOGY::\n" + str(target_topo) msg += "\n" raise RedistributeNotImplementedError(msg) return best_redis @staticmethod def _select_best_operator(lhs, rhs): # select highest priority operator when there are more # than one candidate source topology if lhs is None: return rhs if rhs is None: return lhs check_instance(lhs, RedistributeOperatorBase) check_instance(rhs, RedistributeOperatorBase) lhs_priority = Redistribute.__redistribute_operators[type(lhs)] rhs_priority = Redistribute.__redistribute_operators[type(rhs)] if lhs_priority <= rhs_priority: return lhs else: return rhs @staticmethod def _get_compatible_redistribute( variable, source_topo, target_topo, components, **kwds ): # look from highest prority operator to smallest priority operators # if nothing is found return None for cls, _ in sorted( Redistribute.__redistribute_operators.items(), key=lambda x: x[1] ): if cls.can_redistribute( source_topo=source_topo, target_topo=target_topo, **kwds ): return cls( variable=variable, source_topo=source_topo, target_topo=target_topo, components=components, **kwds, ) return None